package com.lwl.nio.select;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;

public class SelectSocketsThreadPool extends SelectSockets {

	private static final int MAX_THREADS = 5;
	private ThreadPool pool = new ThreadPool(MAX_THREADS);
	
	public static void main(String[] argv) throws Exception {
		new SelectSocketsThreadPool().go();
	}

	@Override
	protected void readDataFromSocket(SelectionKey key) throws IOException {
		WorkerThread worker = pool.getWorker();
		if (worker == null) {
			// No threads available. Do nothing. The selection
			// loop will keep calling this method until a
			// thread becomes available. This design could
			// be improved.
			return;
		}
		// Invoking this wakes up the worker thread, then returns
		worker.serviceChannel(key);
	}

	private class ThreadPool {
		LinkedList<WorkerThread> idle = new LinkedList<>();

		public ThreadPool(int poolSize) {
			for (int i = 0; i < poolSize; i++) {
				WorkerThread thread = new WorkerThread(this);
				thread.setName("Worker" + (i + 1));
				thread.start();
				idle.add(thread);
			}
		}

		WorkerThread getWorker() {
			WorkerThread worker = null;
			synchronized (idle) {
				if (idle.size() > 0) {
					worker = idle.remove(0);
				}
			}
			return worker;
		}

		void returnWorker(WorkerThread worker) {
			synchronized (idle) {
				idle.add(worker);
			}
		}
	}

	private class WorkerThread extends Thread {
		private ByteBuffer buffer = ByteBuffer.allocate(1024);
		private ThreadPool pool;
		private SelectionKey key;

		public WorkerThread(ThreadPool pool) {
			this.pool = pool;
		}

		public synchronized void run() {
			System.out.println(this.getName() + " is ready");
			while (true) {
				try {
					this.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
					Thread.interrupted();
				}
				if (key == null) {
					continue;
				}
				System.out.println(this.getName() + " has been awakened");

				try {
					drainChannel(key);
				} catch (Exception e) {
					System.out.println("Caught '" + e + "' closing channel");
					try {
						key.channel().close();
					} catch (IOException ex) {
						ex.printStackTrace();
					}
					key.selector().wakeup();
				}
				key = null;
				this.pool.returnWorker(this);
			}
		}

		/**
		 * Called to initiate a unit of work by this worker thread on the
		 * provided SelectionKey object. This method is synchronized, as is the
		 * run( ) method, so only one key can be serviced at a given time.
		 * Before waking the worker thread, and before returning to the main
		 * selection loop, this key's interest set is updated to remove OP_READ.
		 * This will cause the selector to ignore read-readiness for this
		 * channel while the worker thread is servicing it.
		 */
		synchronized void serviceChannel(SelectionKey key) {
			this.key = key;
			key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
			this.notify();
		}

		/**
		 * The actual code which drains the channel associated with the given
		 * key. This method assumes the key has been modified prior to
		 * invocation to turn off selection interest in OP_READ. When this
		 * method completes it re-enables OP_READ and calls wakeup( ) on the
		 * selector so the selector will resume watching this channel.
		 */
		void drainChannel(SelectionKey key) throws IOException {
			SocketChannel channel = (SocketChannel) key.channel();
			int count;
			buffer.clear();
			// Loop while data is available; channel is nonblocking
			while ((count = channel.read(buffer)) > 0) {
				buffer.flip();
				while (buffer.hasRemaining()) {
					channel.write(buffer);
				}
				buffer.clear();
			}
			if (count < 0) {
				channel.close();
				return;
			}
			key.interestOps(key.interestOps() | SelectionKey.OP_READ);
			key.selector().wakeup();
		}

	}

}
